package cn.aohan.delayedqueue.listener;

import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 延迟任务监听器
 *
 * @author 傲寒
 * @date 2024/4/19
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class DelayedTaskListener implements ApplicationRunner {

    private final DelayedQueueProvider delayedQueueProvider;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE);
    }

    public void delayedTaskHandle(String delayedQueueName) {
        final Thread thread = new Thread(() -> {
            final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName);
            while (true) {
                try {
                    //将到期的数据取出来，等待超市
                    final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES);
                    if (Objects.isNull(delayedTaskInfo)) {
                        continue;
                    }
                    log.info("DelayedTask task :[{}]", delayedTaskInfo);
                } catch (Exception e) {
                    log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }
}
